{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Apache Beam Notebooks for Streaming NLP on Real-time Tweets"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this demo we will walk through setting up a local client to gather tweets using the `tweepy` API. After that we will using the interactive runner in Apache Beam notebooks to build a pipeline to do natural language processing on tweets in real-time. One of the advantages of using the interactive runner is that we can explore the intermediate outputs for our pipeline while building the pipeline!\n",
    "\n",
    "At the end of the notebook we will turn the relevant parts of the notebook into a script where we can deploy our streaming pipeline on Cloud Dataflow.\n",
    "\n",
    "First, let us look at the script we will be using to gather our tweets and publish them to Pub/Sub."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "!cat tweet-setup.sh"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After installing some packages, we will run the `tweets-gatherer.py` script. This script will not be covered explicitly in the demo, but it is recommended to glance through the code and see how the Tweepy API and Pub/Sub client are being used. \n",
    "\n",
    "Note that you need to have a Twitter Developer Account to run this script. The free version of the account will suffice and you can sign up here. We need the the Twitter API Consumer Key/Secret and the Twitter API Access Key/Secret for our client to be able to search and pull tweets in real time. These tweets will be published to a Pub/Sub topic in your project created by the script above.\n",
    "\n",
    "Before moving forward, insert your Twitter Developer API keys, open a terminal (File > New > Terminal) and run the command `bash tweet-setup.sh`. If you already have a Pub/Sub topic named `tweet-nlp-demo` or a BigQuery dataset named `tweet_nlp_demo` then you can ignore the corresponding error messages."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Before we begin to build our pipeline, we need to install a couple of Python client libraries. After doing this, you should reset the notebook kernel (Kernel > Restart Kernel) so that the packages are properly picked up. It may take a few minutes to install the packages."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "%pip install google-cloud-translate google-cloud-language"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will start by importing the packages that we need for the notebook. The first code block contains packages that we will need when we submit the pipeline to Dataflow, so we will want to include the code cell in the exported script. **Before running the cell, be sure to change the Project ID to your own**. The rest of the variables (`OUTPUT_DATASET`, `OUTPUT_TABLE_UNAGG`,`OUTPUT_TABLE_AGG`, and `INPUT_TOPIC`) refer to objects created within the lab."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import argparse, os, json, logging\n",
    "from datetime import datetime, timedelta\n",
    "import json\n",
    "import pandas as pd\n",
    "\n",
    "import apache_beam as beam\n",
    "from apache_beam.transforms import trigger\n",
    "from apache_beam.io.gcp.internal.clients import bigquery\n",
    "from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions, SetupOptions, StandardOptions\n",
    "\n",
    "import google.auth\n",
    "from google.cloud import language_v1\n",
    "from google.cloud.language_v1 import enums\n",
    "from google.cloud import translate_v2 as translate\n",
    "\n",
    "print('Beam Version:', beam.__version__)\n",
    "\n",
    "PROJECT_ID = 'your-project-id-here' #TODO: CHANGE PROJECT ID\n",
    "OUTPUT_DATASET = 'tweet_nlp_demo'\n",
    "OUTPUT_TABLE_UNAGG = 'processed_tweet_data'\n",
    "OUTPUT_TABLE_AGG = 'aggregated_tweet_data'\n",
    "INPUT_TOPIC = \"projects/{}/topics/tweet-nlp-demo\".format(PROJECT_ID)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "However, the next cell contains code to import the interactive runner we will use to explore the pipeline within the notebook. We do not want to include this in the final script so we will annotate it as such."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "from apache_beam.runners.interactive import interactive_runner\n",
    "import apache_beam.runners.interactive.interactive_beam as ib"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we define our pipeline options. Since we wish to deal with data in real-time, we will set the streaming option to `True` to ensure that the pipeline runs indefinitely. The behavior differs slightly when we wish to use the interactive runner, but we will address that in just a moment."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Setting up the Beam pipeline options.\n",
    "options = PipelineOptions()\n",
    "\n",
    "# Sets the pipeline mode to streaming, so we can stream the data from PubSub.\n",
    "options.view_as(StandardOptions).streaming = True\n",
    "\n",
    "# Sets the project to the default project in your current Google Cloud environment.\n",
    "# The project will be used for creating a subscription to the PubSub topic.\n",
    "_, options.view_as(GoogleCloudOptions).project = google.auth.default()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we set up our interactive runner. Note that we're setting a capture duration of 60 seconds. Instead of waiting indefinitely for more data to come in, we will collect 60 seconds worth of data and load it into an in-memory PCollection. That way we can visualize the results one transform at a time while building our pipeline. When we run the pipeline in Dataflow, we will want to run the pipeline indefintely. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.options.capture_duration = timedelta(seconds=60)\n",
    "p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**DO NOT RUN THE NEXT CELL IN THE NOTEBOOK!!!** The next cell defines all of the options for running the pipeline on Dataflow and we do not want to run this in the notebook. The cell is left here (uncommented) so that it will properly be included when we run `nbconvert` after exploring our pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from apache_beam.runners import DataflowRunner\n",
    "options.view_as(StandardOptions).runner = 'DataflowRunner'\n",
    "google_cloud_options = options.view_as(GoogleCloudOptions)\n",
    "google_cloud_options.job_name = 'tweet-nlp-pipeline'\n",
    "google_cloud_options.staging_location = 'gs://{}/binaries'.format(PROJECT_ID)\n",
    "google_cloud_options.temp_location = 'gs://{}/temp'.format(PROJECT_ID)\n",
    "google_cloud_options.region = 'us-central1'\n",
    "p = beam.Pipeline(DataflowRunner(), options=options)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we are ready to start building our pipeline! We start by reading in tweets from our Pub/Sub topic using the `ReadFromPubSub` connector. After that we will use the `json.loads` function to parse the incoming JSON blob containing the text of the tweet and its attributes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# So that Pandas Dataframes do not truncate data...\n",
    "pd.set_option('display.max_colwidth', -1)\n",
    "\n",
    "tweets = p | 'ReadTweet' >> beam.io.gcp.pubsub.ReadFromPubSub(topic=INPUT_TOPIC) | beam.Map(json.loads)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What we did in the previous cell was add two transformations to our pipelines DAG (Directed Acyclic Graph). We have not processed any data yet! We can use `ib.show` to ingest data from our Pub/Sub topic for 60 seconds (per our `capture_duration` option above) and store the data in an in-memory PCollection, we then apply `json.loads` to the elements of the PCollection and can visualize the results via Pandas. \n",
    "\n",
    "**WARNING:** The incoming tweets are (unfiltered) tweets containing the search term \"pizza\". Though the search term was chosen to be as uncontroversial as possible, anything could be in these tweets. Of course, this includes possibly very offensive material."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(tweets)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can see the JSON blobs sent to Pub/Sub by the Twitter API. However we are only going to want certain properties of the messages for our goal. Let's take the \"text\", \"created_at\" and \"source\" fields for each message and pack them into a dictionary. We will create a custom function `parse_fields` and apply it in our pipeline once again using `beam.Map`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def parse_fields(tweet):\n",
    "    \n",
    "    trim = {}\n",
    "    \n",
    "    trim['text'] = tweet['messages'][0]['data']['text']\n",
    "    trim['created_at'] = tweet['messages'][0]['data']['created_at']                       \n",
    "    trim['source']=tweet['messages'][0]['data']['source']\n",
    "    return trim\n",
    "\n",
    "parsed_tweets = tweets | \"Parse_Tweet\" >> beam.Map(parse_fields)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let us quickly use `ib.show` again to see the results of our parsing. Note that the output of the previous steps is still in an in-memory PCollection, so we do not have to wait a minute for data to come in through the Pub/Sub IO Connection again."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(parsed_tweets)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that the dictionaries are parsed by the interactive runner so that when we visualize the data it is presented as a table. Before we move on, we can use the `ib.show_graph` to visualize our pipeline. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show_graph(p)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can see the transforms (in boxes) with the cell numbers corresponding to them. In the circles between the tranforms, we can see the names of the corresponding PCollections. Note that between the `ReadTweet` and the `Map(loads)` transforms the name was generated by Beam since we did not assign a name ourselves.\n",
    "\n",
    "Now we are ready to begin applying machine learning to the data. The NLP (Natural Language Processing) API only supports certain languages for sentiment analysis. So, what we will do is first use the Translation API to detect the language. We will create a Python function, `detect_language`, to call the Translation API and add it to our pipeline once again using `beam.Map`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def detect_language(tweet):\n",
    "    \n",
    "    translate_client = translate.Client()\n",
    "    \n",
    "    text = tweet['text']\n",
    "    result = translate_client.detect_language(text)\n",
    "    \n",
    "    tweet['language'] = result['language']\n",
    "    tweet['lang_confidence'] = result['confidence']\n",
    "    \n",
    "    return tweet\n",
    "\n",
    "lang_tweets = parsed_tweets | \"Detect_Language\" >> beam.Map(detect_language)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let us now detect the language of our tweets. Note that we will also record the confidence in the API's predictions ('lang_confidence') for later reference."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(lang_tweets)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we are ready to perform sentiment analysis on our tweets. We will invoke the NLP API to analyze the sentiment of tweets involving the term \"pizza\". Note that the translation of \"pizza\" is \"pizza\" in many languages, including French, German, Itaian, Portugese, and Spanish. These are lanaguages that are supported by the NLP API, so we will will filter based off the language detected by the Translation API. In the case that we are not working with one of these languages, we will assign a `None` value to the score and magnitude fields.\n",
    "\n",
    "As in the previous steps, we will invoke the API using a function and then call the function in our pipeline using `beam.Map`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def analyze_sentiment(tweet):\n",
    "\n",
    "    client = language_v1.LanguageServiceClient()\n",
    "\n",
    "    type_ = enums.Document.Type.PLAIN_TEXT\n",
    "\n",
    "    if tweet['language'] in ['en', 'fr', 'de', 'it', 'pt', 'es']:\n",
    "        \n",
    "        language = tweet['language']\n",
    "        document = {\"content\": tweet['text'], \"type\": type_, \"language\": language}\n",
    "\n",
    "        encoding_type = enums.EncodingType.UTF8\n",
    "\n",
    "        response = client.analyze_sentiment(document, encoding_type=encoding_type)\n",
    "        \n",
    "        tweet['score'] = response.document_sentiment.score\n",
    "        tweet['magnitude'] = response.document_sentiment.magnitude\n",
    "    \n",
    "    else:\n",
    "        \n",
    "        tweet['score'] = None\n",
    "        tweet['magnitude'] = None\n",
    "    \n",
    "    return tweet\n",
    "        \n",
    "analyzed_tweets = lang_tweets | \"Analyze_Tweets\" >> beam.Map(analyze_sentiment)   "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And as before, let us take a look into our processed tweets by using `ib.show`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(analyzed_tweets, include_window_info=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We now have all of the information that we need to start performing our aggregations. However, there's one more thing we should address first. The date-timestamp (DTS) that Dataflow uses by default is the Pub/Sub publication time (when using the `ReadFromPubSub` connector). However, we would rather sort the tweets in the context of when they are published to Twitter. Above we can see that the `event_time` field and the `created_at` times are slightly different. We can replace the timestamp with the one in the `created_at` field."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def custom_timestamp(tweet):\n",
    "    ts = datetime.strptime(tweet[\"created_at\"], \"%Y-%m-%dT%H:%M:%S\")\n",
    "    return beam.window.TimestampedValue(tweet, ts.timestamp())\n",
    "\n",
    "analyzed_tweets_w_dts = analyzed_tweets | 'CustomTimestamp' >> beam.Map(custom_timestamp)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(analyzed_tweets_w_dts, include_window_info=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In our example here we will group our data into sliding windows of length 30 seconds and starting every 10 seconds. We do this by using the `beam.WindowInto` transform and specifying the window type, length, and offset using `beam.window.SlidingWindows`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "windowed_tweets = analyzed_tweets_w_dts | \"Window\" >> beam.WindowInto(beam.window.SlidingWindows(30, 10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What does this actually do to our data in out PCollection? The best thing to do here is go ahead and take a peek into the output of the pipeline up to this point using `ib.show`. We will set the `include_window_info` flag to `True` so that we can peek into how windows are assigned."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(windowed_tweets, include_window_info=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Did you notice something above? Every tweet is now triplicated, with one entry for each window it belongs to. Another thing to notice is that we have simply *assigned* the windows at this point, the data has not been grouped into windows yet.\n",
    "\n",
    "We want to measure sentiment over time depending on the source of the tweet. To do this, let us create a \"key-value\" pair for each tweet. Strictly speaking, we do not have a key-value pair construction in Python, but Beam will treat the first value of an ordered pair as a \"key\" and the second value of the ordered pair as the \"value\".\n",
    "\n",
    "The key will be the source of the tweet and the value will be a dictionary of the score and magnitude of the tweet. We will be using both of these data points in the next transform. \n",
    "\n",
    "We follow a similar pattern from before: we create a Python function to perform our element-wise computation. However you may notice something new here. We `yield` instead of `return` at the end of our function. We do this because we want to return a generator instead of a single element. But why? Note that `create_source_key` does not return anything in the case that we did not assign a score above! So we either return nothing or a generator with a single element. We then add the transform to the pipeline using `beam.FlatMap`. `FlatMap` is perfect for any non-1:1 transform such as `create_source_key`; `FlatMap` expects the function being applied to return a generator and it will manage cycling through the generator when the PCollection is passed to the next transform. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_source_key(tweet):\n",
    "    \n",
    "    if tweet['score']:\n",
    "        yield (tweet['source'], {'score': tweet['score'], 'magnitude': tweet['magnitude']})\n",
    "        \n",
    "prepped_tweets = windowed_tweets | \"Create_Source_Key\" >> beam.FlatMap(create_source_key)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(prepped_tweets)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we are ready to perform our aggregation. We will combine a weighted average of scores per window and per source. We will use the magnitude as our weight for the weighted average. However, there is not a built-in transform for performing this task!\n",
    "\n",
    "We will create our own custom combiner by extending `beam.CombineFn`. We need to define four functions when extending `beam.CombineFn` to create our custom combiner:\n",
    "1. `create_accumulator`: We initialize the information we will be passing from node to node. In our case we have an ordered pair (sum, count) where sum is the running sum of weighted scores.\n",
    "2. `add_input`: When we wish to include a new data point, how is it incorporated? We will add the magnitude times the score to the sum and increment the count by 1.\n",
    "3. `merge_accumulators`: We will be computing the accumulators where they live in the cluster, what do we do when we need to shuffle data for the final aggregation? This is why we are passing ordered pairs instead of averages, we can simple combine the sums and the counts.\n",
    "4. `extract_output`: This is the function that computes the final output. We finally combine our final weighted average by dividing the sum by the count. However, we need to anticipate the case that the count is 0 (as initally set). In this case, we will set the score to be `NaN`.\n",
    "\n",
    "Once we have created our custom combiner, we can apply it in our pipeline by calling `beam.CombinePerKey`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class WeightedAverageFn(beam.CombineFn):\n",
    "    def create_accumulator(self):\n",
    "        return (0.0, 0)\n",
    "\n",
    "    def add_input(self, sum_count, input):\n",
    "        sum, count = sum_count\n",
    "        return sum + input['score'] * input['magnitude'], count + 1\n",
    "\n",
    "    def merge_accumulators(self, accumulators):\n",
    "        sums, counts = zip(*accumulators)\n",
    "        return sum(sums), sum(counts)\n",
    "\n",
    "    def extract_output(self, sum_count):\n",
    "        sum, count = sum_count\n",
    "        return {'score': sum / count, 'count': count} if count else {'score':float('NaN'), 'count': 0}\n",
    "\n",
    "aggregated_tweets = prepped_tweets | \"Aggregate_Weighted_Score\" >> beam.CombinePerKey(WeightedAverageFn())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let us take a quick peek at the output of our aggregations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(aggregated_tweets, include_window_info=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We're almost there! Let us just clean up our output to put it into a more convenient form for loading into BigQuery."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def parse_aggregation(agg_tweets):\n",
    "    \n",
    "    result = {}\n",
    "    \n",
    "    result['source'] = agg_tweets[0]\n",
    "    result['score'] = agg_tweets[1]['score']\n",
    "    result['count'] = agg_tweets[1]['count']\n",
    "    \n",
    "    return result\n",
    "\n",
    "\n",
    "parsed_aggregated_tweets = aggregated_tweets | \"Parse_Aggregated_Results\" >> beam.Map(parse_aggregation)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show(parsed_aggregated_tweets,include_window_info=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We have created all of the transforms for our pipeline and we are ready to start analyzing and processing the entire real-time stream (versus working with a small in-memory PCollection). We will wrap up by defining two transforms to load data into BigQuery. We will load the aggregated tweet data (`parsed_aggregated_tweets`) and the unaggregated, analyzed tweets to a different table (`analyzed_tweets`). Keeping the unaggregated, analyzed tweets will allow us to go back and further analyze the individual tweets if another question arises without having to reprocess. Of course, we are paying to store the tweets in BigQuery, but this is much cheaper than having to reprocess."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "table_spec_unagg = bigquery.TableReference(\n",
    "      projectId = PROJECT_ID,\n",
    "      datasetId = OUTPUT_DATASET,\n",
    "      tableId= OUTPUT_TABLE_UNAGG)\n",
    "   \n",
    "table_schema_unagg ='text:STRING, created_at:TIMESTAMP, source:STRING, language:STRING, lang_confidence:FLOAT64, score:FLOAT64, magnitude:FLOAT64'\n",
    "\n",
    "bq_output_unagg = analyzed_tweets | 'WriteToBQ_Unagg'>> beam.io.WriteToBigQuery(table_spec_unagg,\n",
    "                                                                           schema=table_schema_unagg,\n",
    "                                                                           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n",
    "                                                                           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)\n",
    "\n",
    "table_spec_agg = bigquery.TableReference(\n",
    "      projectId = PROJECT_ID,\n",
    "      datasetId = OUTPUT_DATASET,\n",
    "      tableId= OUTPUT_TABLE_AGG)\n",
    "\n",
    "\n",
    "table_schema_agg ='source:STRING, score:FLOAT64, count:INT64, window_start:TIMESTAMP'\n",
    "\n",
    "\n",
    "bq_output_agg = parsed_aggregated_tweets | 'WriteToBQ_Agg'>> beam.io.WriteToBigQuery(table_spec_agg,\n",
    "                                                                                  schema=table_schema_agg,\n",
    "                                                                                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n",
    "                                                                                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)\n",
    "\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can finally go back and look at our completed graph. Note that by applying `bq_output_unagg` to `analyzed_tweets` we have created a branch in the pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "ib.show_graph(p)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Everything is ready for deploying to Dataflow! We will use the `nbconvert` tool to export this Jupyter Notebook into a Python script, so we can execute the script in other environments without having to install a tool to run notebooks. The cells that were flagged as `NoExport` will not be included in the script. These were cells that used the interactive runner or cells used to work within the notebook environment that we don't need when submitting to Dataflow.\n",
    "\n",
    "The final cell of the notebook includes the `p.run()` call that we need to execute the pipeline on Dataflow. You do not need to run that cell within the notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "!jupyter nbconvert --to script --RegexRemovePreprocessor.patterns=\"['# NoExport']\" TweetPipeline.ipynb"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let us go ahead and submit the job to Dataflow! We will do this by using executing the Python script we just created. After you run the cell be sure to check out the job running in Dataflow and the output in your BigQuery dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NoExport\n",
    "!pip install apache_beam google-cloud-language google-cloud-translate google-apitools\n",
    "!echo \"google-cloud-translate==2.0.1\" > requirements.txt\n",
    "!python3 TweetPipeline.py --save_main_session --requirements_file requirements.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Don't run this cell within the notebook!\n",
    "logging.getLogger().setLevel(logging.INFO)\n",
    "p.run()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " Copyright 2020 Google Inc. All Rights Reserved.\n",
    "\n",
    " Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    " you may not use this file except in compliance with the License.\n",
    " You may obtain a copy of the License at\n",
    "\n",
    "     http://www.apache.org/licenses/LICENSE-2.0\n",
    "\n",
    " Unless required by applicable law or agreed to in writing, software\n",
    " distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    " See the License for the specific language governing permissions and\n",
    " limitations under the License."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.18"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
